== Sample 7

This sample demonstrates the application of the new consumer rebalance protocol in Spring for Apache Kafka.

The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].

`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup.

```yaml
version: '3'
services:
  broker:
    image: bitnami/kafka:3.7.0
    ...
      # KIP-848
      KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
      KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
```

The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol.

The `group.protocol` can be configured in the `resources/application.yaml` as follows:

```yaml
spring:
  kafka:
    consumer:
      properties:
        group.protocol: consumer
```

Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`.

The `Broker` will then send the Topic Partition Assign information to the `Consumer`. This means that the `Consumer` rebalancing has finished, and the `Consumer` has started to poll messages.

```java
@Component
public class Sample07KafkaListener {

	@KafkaListener(topics = "test-topic", groupId = "sample07-1")
	public void listenWithGroup1(String message) {
		System.out.println("Received message at group sample07-1: " + message);
	}

	@KafkaListener(topics = "test-topic", groupId = "sample07-2")
	public void listenWithGroup2(String message) {
		System.out.println("Received message at group sample07-2: " + message);
	}
}
```
